/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.dao;

import com.chinamobile.cmss.lakehouse.common.dto.PageBean;
import com.chinamobile.cmss.lakehouse.common.dto.SearchMetadataTableDto;
import com.chinamobile.cmss.lakehouse.common.dto.TableDto;
import com.chinamobile.cmss.lakehouse.common.enums.ExecuteTaskType;
import com.chinamobile.cmss.lakehouse.dao.entity.HiveTableEntity;
import com.chinamobile.cmss.lakehouse.dao.entity.QHiveTableEntity;
import com.chinamobile.cmss.lakehouse.dao.entity.QTableTaskInfoEntity;
import com.chinamobile.cmss.lakehouse.dao.entity.TableTaskInfoEntity;

import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

import com.google.common.base.Preconditions;
import com.querydsl.core.QueryResults;
import com.querydsl.core.Tuple;
import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

@Slf4j
public class HiveTableCustomDaoImpl implements HiveTableCustomDao {

    @PersistenceContext
    private EntityManager em;

    public PageBean<TableDto> searchTable(Set<String> databases, SearchMetadataTableDto searchMetadataTableDto) {
        Preconditions.checkNotNull(searchMetadataTableDto);
        log.info("searchTable([{}])", databases.stream().collect(Collectors.joining(",")));
        JPAQueryFactory factory = new JPAQueryFactory(em);
        return getTableDtoPageDto(databases, searchMetadataTableDto, factory);
    }

    protected PageBean<TableDto> getTableDtoPageDto(Set<String> databases, SearchMetadataTableDto searchMetadataTableDto, JPAQueryFactory factory) {
        JPAQuery<Tuple> query = factory.select(QHiveTableEntity.hiveTableEntity, QTableTaskInfoEntity.tableTaskInfoEntity)
            .from(QHiveTableEntity.hiveTableEntity)
            .leftJoin(QTableTaskInfoEntity.tableTaskInfoEntity)
            .on(QHiveTableEntity.hiveTableEntity
                .tableName
                .eq(QTableTaskInfoEntity.tableTaskInfoEntity.tableName)
                .and(QHiveTableEntity.hiveTableEntity.hiveDatabaseEntity.name.eq(QTableTaskInfoEntity.tableTaskInfoEntity.dbName)))
            .where(QHiveTableEntity
                .hiveTableEntity
                .hiveDatabaseEntity
                .name
                .in(databases.stream().map(String::toLowerCase)
                    .collect(Collectors.toList())));
        return getTableDtoPageDtoInternal(databases, searchMetadataTableDto, query);
    }

    protected PageBean<TableDto> getTableDtoPageDtoInternal(Set<String> databases, SearchMetadataTableDto searchMetadataTableDto, JPAQuery<Tuple> query) {
        Integer[] seconds = searchMetadataTableDto.toSeconds();
        if (seconds != null) {
            query.where(QHiveTableEntity.hiveTableEntity.createTime.between(seconds[0], seconds[1]));
        }
        if (StringUtils.isNotEmpty(searchMetadataTableDto.getTableName())) {
            query.where(QHiveTableEntity.hiveTableEntity.tableName.likeIgnoreCase(searchMetadataTableDto.getTableName()));
        }
        if (StringUtils.isNotEmpty(searchMetadataTableDto.getDatabaseName())) {
            query.where(QHiveTableEntity.hiveTableEntity.hiveDatabaseEntity.name.likeIgnoreCase(searchMetadataTableDto.getDatabaseName()));
        }
        if (StringUtils.isNotEmpty(searchMetadataTableDto.getRefTaskId())) {
            query.where(QTableTaskInfoEntity.tableTaskInfoEntity.taskId.like(searchMetadataTableDto.getRefTaskId()));
        }
        if (StringUtils.isNotEmpty(searchMetadataTableDto.getRefTaskType())) {
            ExecuteTaskType taskType = ExecuteTaskType.forValues(searchMetadataTableDto.getRefTaskType().trim());
            query.where(QTableTaskInfoEntity.tableTaskInfoEntity.taskType.eq(taskType));
        }
        int start = (searchMetadataTableDto.getPageNo() - 1) * searchMetadataTableDto.getPageSize();
        query.offset(start).limit(searchMetadataTableDto.getPageSize());
        QueryResults<Tuple> queryResults = query.orderBy(QTableTaskInfoEntity.tableTaskInfoEntity.createTime.desc()).fetchResults();
        PageBean<TableDto> pageDto = new PageBean<>();
        pageDto.setLimit((int) queryResults.getLimit());
        pageDto.setTotalPage((int) queryResults.getTotal());
        pageDto.setResultList(queryResults
            .getResults()
            .stream()
            .map(tup -> {
                HiveTableEntity hiveTable = tup.get(0, HiveTableEntity.class);
                TableTaskInfoEntity tableTaskInf = tup.get(1, TableTaskInfoEntity.class);
                TableDto tableDto = new TableDto();
                tableDto.setTableName(hiveTable.getTableName());
                tableDto.setDatabaseName(hiveTable.getHiveDatabaseEntity().getName());
                //外部表显示路径 内部表不显示路径
                final String externalTableType = "EXTERNAL_TABLE";
                if (hiveTable.getTableType().equals(externalTableType)) {
                    Optional.ofNullable(hiveTable.getSdsEntity())
                        .ifPresent(sds ->
                            tableDto.setLocation(sds.getLocation()));
                } else {
                    tableDto.setLocation("");
                }
                tableDto.setCreator(hiveTable.getOwner());
                tableDto.setCreateTime(hiveTable.getCreateTime());
                if (tableTaskInf != null) {
                    tableDto.setRefTaskType(tableTaskInf.getTaskType());
                    tableDto.setRefTaskId(tableTaskInf.getTaskId());
                    tableDto.setLastModifier(tableTaskInf.getCreator());
                    tableDto.setLastModifierTime(tableTaskInf.getCreateTime());
                } else {
                    tableDto.setRefTaskType(ExecuteTaskType.UNKNOWN);
                    tableDto.setRefTaskId("-");
                    tableDto.setLastModifier("-");
                    tableDto.setLastModifierTime(-1);
                }
                return tableDto;
            }).collect(Collectors.toList()));
        return pageDto;
    }

}